Question Details

No question body available.

Tags

java types apache-flink

Answers (1)

February 8, 2026 Score: 0 Rep: 50,101 Quality: Medium Completeness: 100%

Preface: This answer explains what is happening based on the implementation. I don't know if this behavior is defined in the language specification. Also, I experimented with Java 25; it's possible things have changed, even more than once, since Java 8.


Following the source code of DataStream#keyBy(KeySelector), the way they try to resolve the key type when the argument is a lambda expression or method reference is by getting the argument's java.lang.invoke.SerializedLambda form and looking at its implementation method. The problem then stems from how Java, or at least javac, implements lambda expressions versus method references.

Note Flink's KeySelector extends java.io.Serializable.

For example, here's some code that is similar to how Flink uses SerializedLambda. Note it requires Java 25+ to run (at least without preview features enabled).

package com.example;

import java.io.Serializable; import java.lang.constant.ConstantDescs; import java.lang.invoke.MethodHandleInfo; import java.lang.invoke.SerializedLambda; import java.lang.reflect.Constructor; import java.lang.reflect.Executable; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.lang.reflect.Type; import java.util.List; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream;

public class Main {

// Similar to Flink's KeySelector @FunctionalInterface interface KeySelector extends Serializable {

K getKey(T value); }

// Similar to your TransformationService implementation static class TestFunction implements Function {

@Override public String apply(String s) { // Implementation not needed for example throw new UnsupportedOperationException(); } }

record LambdaExecutable(List paramTypes, Type returnType, Executable executable) {

LambdaExecutable(Executable executable) { this(executable, getReturnType(executable, true)); }

private LambdaExecutable(Executable executable, Type returnType) { this(List.of(executable.getGenericParameterTypes()), returnType, executable); } }

static void main() throws Exception { Function function = new TestFunction(); test("Lambda Expression", value -> function.apply(value)); test("Method Reference ", function::apply); }

// Prints the SerializedLambda and LambdaExecutable created from 'selector', as well // as the "key type" as inferred from 'selector'. static void test(String label, KeySelector selector) throws IllegalAccessException, InvocationTargetException, ClassNotFoundException { System.out.printf("########## %s ##########%n", label);

SerializedLambda lambda = toSerializedLambda(selector); System.out.printf( """ SerializedLambda: Capturing class = %s Functional interface method = %s.%s%s Instantiated method type = %s Implementation = %s %s.%s%s Arguments captured = %d """, lambda.getCapturingClass(), lambda.getFunctionalInterfaceClass(), lambda.getFunctionalInterfaceMethodName(), lambda.getFunctionalInterfaceMethodSignature(), lambda.getInstantiatedMethodType(), MethodHandleInfo.referenceKindToString(lambda.getImplMethodKind()), lambda.getImplClass(), lambda.getImplMethodName(), lambda.getImplMethodSignature(), lambda.getCapturedArgCount());

LambdaExecutable implementation = getImplementation(lambda); System.out.printf( """ LambdaExecutable: Parameter types = %s Return type = %s Executable = %s """, implementation.paramTypes(), implementation.returnType(), implementation.executable());

System.out.printf( """ Key: Type = %s Value = %s """, implementation.returnType().getClass().getName(), implementation.returnType()); System.out.println(); }

static SerializedLambda toSerializedLambda(Object instance) throws IllegalAccessException, InvocationTargetException { for (Class cls = instance.getClass(); cls != null; cls = cls.getSuperclass()) { try { Method writeReplace = cls.getDeclaredMethod("writeReplace"); writeReplace.setAccessible(true); if (writeReplace.invoke(instance) instanceof SerializedLambda lambda) { return lambda; } } catch (NoSuchMethodException ) { } } throw new IllegalArgumentException("Not a serializable lambda."); }

static LambdaExecutable getImplementation(SerializedLambda lambda) throws ClassNotFoundException { String implClsName = lambda.getImplClass(); String implMtdName = lambda.getImplMethodName(); String implMtdSig = lambda.getImplMethodSignature();

ClassLoader clsLoader = Thread.currentThread().getContextClassLoader(); Class implCls = Class.forName(implClsName.replace('/', '.'), true, clsLoader);

Stream execs = implMtdName.equals(ConstantDescs.INITNAME) ? Stream.of(implCls.getDeclaredConstructors()) : Stream.of(implCls.getDeclaredMethods());

return execs .filter(exec -> getSignature(exec).equals(implMtdSig)) .findFirst() .map(LambdaExecutable::new) .orElseThrow(() -> new IllegalStateException("Implementation method not found.")); }

static String getSignature(Executable exec) { Class[] pTypes = exec.getParameterTypes(); Class rType = (Class) getReturnType(exec, false); return Stream.of(pTypes) .map(Class::descriptorString) .collect(Collectors.joining("", "(", ")" + rType.descriptorString())); }

// Guaranteed to return Class if 'generic' is false static Type getReturnType(Executable exec, boolean generic) { return switch (exec) { case Constructor ctor -> ctor.getDeclaringClass(); case Method mtd when generic -> mtd.getGenericReturnType(); case Method mtd -> mtd.getReturnType(); }; } }

If you look at the bytecode of that class via javap you'll see the following bootstrap methods.

Lambda expression

For value -> function.apply(value)

0: #423 REFinvokeStatic java/lang/invoke/LambdaMetafactory.altMetafactory:(Ljava/lang/invoke/MethodHandles$Lookup;Ljava/lang/String;Ljava/lang/invoke/MethodType;[Ljava/lang/Object;)Ljava/lang/invoke/CallSite;
  Method arguments:
    #392 (Ljava/lang/Object;)Ljava/lang/Object;
    #393 REFinvokeStatic com/example/Main.lambda$main$214a3767$1:(Ljava/util/function/Function;Ljava/lang/String;)Ljava/lang/String;
    #396 (Ljava/lang/String;)Ljava/lang/String;
    #397 5
    #398 0

The implementation method is com/example/Main.lambda$main$214a3767$1 which has a return type of java.lang.String.

Method reference

For function::apply

1: #423 REFinvokeStatic java/lang/invoke/LambdaMetafactory.altMetafactory:(Ljava/lang/invoke/MethodHandles$Lookup;Ljava/lang/String;Ljava/lang/invoke/MethodType;[Ljava/lang/Object;)Ljava/lang/invoke/CallSite;
  Method arguments:
    #392 (Ljava/lang/Object;)Ljava/lang/Object;
    #399 REFinvokeInterface java/util/function/Function.apply:(Ljava/lang/Object;)Ljava/lang/Object;
    #396 (Ljava/lang/String;)Ljava/lang/String;
    #397 5
    #398 0

The implementation method is java/util/function/Function.apply which has an (erased) return type of java.lang.Object. The generic return type is a type variable: R extends Object.

Output

And you can see how the differences show up when you run the example.

########## Lambda Expression ##########
SerializedLambda:
    Capturing class             = com/example/Main
    Functional interface method = com/example/Main$KeySelector.getKey(Ljava/lang/Object;)Ljava/lang/Object;
    Instantiated method type    = (Ljava/lang/String;)Ljava/lang/String;
    Implementation              = invokeStatic com/example/Main.lambda$main$214a3767$1(Ljava/util/function/Function;Ljava/lang/String;)Ljava/lang/String;
    Arguments captured          = 1
LambdaExecutable:
    Parameter types = [interface java.util.function.Function, class java.lang.String]
    Return type     = class java.lang.String
    Executable      = private static java.lang.String com.example.Main.lambda$main$214a3767$1(java.util.function.Function,java.lang.String)
Key:
    Type  = java.lang.Class
    Value = class java.lang.String

########## Method Reference ########## SerializedLambda: Capturing class = com/example/Main Functional interface method = com/example/Main$KeySelector.getKey(Ljava/lang/Object;)Ljava/lang/Object; Instantiated method type = (Ljava/lang/String;)Ljava/lang/String; Implementation = invokeInterface java/util/function/Function.apply(Ljava/lang/Object;)Ljava/lang/Object; Arguments captured = 1 LambdaExecutable: Parameter types = [T] Return type = R Executable = public abstract java.lang.Object java.util.function.Function.apply(java.lang.Object) Key: Type = sun.reflect.generics.reflectiveObjects.TypeVariableImpl Value = R

Flink looks at the return type of the implementation method to figure out the key type. The lambda results in a return type of java.lang.String. But the method reference results in a return type of R. Flink can't figure out what the parameterization of R is in this case, so it throws an exception.

Why not query generic interfaces?

For a class that implements a generic interface, you can use Class#getGenericInterfaces() to get the interface as a ParameterizedType. That gives you access to the actual type arguments. Unfortunately, the class created from a lambda or method reference doesn't return a ParameterizedType for the functional interface. It just gives you the Class.

Possible solution on Flink's end?

That said, even the method reference records the instantiated method type. You can see in the bytecode it's (Ljava/lang/String;)Ljava/lang/String; in both cases. And that information can be queried via SerializedLambda#getInstantiatedMethodType().

Perhaps Flink could be updated to use that information instead of relying solely on the return type of the implementation method.

Solution on your end

You can get your code two work in at least the following ways:

  1. Continue passing a lambda expression instead of a method reference.

  2. Uses an anonymous class, or even a named class, implementation of KeySelector parameterized with non-variable type arguments.

  3. Use the DataStream#keyBy(KeySelector,TypeInformation) overload, which would let you continue using a method reference.